use nom::{IResult, InputTakeAtPosition};
use nom::bytes::streaming::tag;
use nom::character::streaming::space1;
use nom::character::complete::space0;
use nom::character::complete::char;
use std::collections::HashSet;
use nom::Err;
use nom::error::{ParseError, ErrorKind};
use nom::character::is_alphanumeric;
use nom::bytes::complete::escaped;
use nom::combinator::opt;
use nom::sequence::tuple;
use nom::character::complete::one_of;
use nom::multi::many0;

lazy_static! {
    static ref KEY_WORDS: HashSet<&'static str> = {
        let mut s = HashSet::<&'static str>::new();

        s.insert("select");
        s.insert("from");
        s.insert("where");
        s.insert("group by");
        s.insert("order by");
        s.insert("as");

        s
    };
}

#[derive(Debug, Eq, PartialEq)]
struct SelectStatement<'a> {
    field: Vec<FieldEnum<'a>>,
    table: Vec<TableEnum<'a>>
}

#[derive(Debug, PartialEq, Eq)]
enum FieldEnum<'a> {
    Normal(Field<'a>),
    SubSelect(SubSelect<'a>),
    String(StringField<'a>)
}

#[derive(Debug, PartialEq, Eq)]
struct Field<'a> {
    own: Option<&'a str>,
    name: &'a str,
    alias: Option<&'a str>
}

impl <'a> Field<'a> {
    fn new(name: &'a str) -> Field {
        Field {
            own: None,
            name,
            alias: None
        }
    }

    fn new_own(own: Option<&'a str>, name: &'a str) -> Self {
        Field {
            own,
            name,
            alias: None
        }
    }

    fn new_all(own: Option<&'a str>, name: &'a str, alias: Option<&'a str>) -> Self {
        Field {
            own,
            name,
            alias
        }
    }
}

#[derive(Debug, Eq, PartialEq)]
struct SubSelect<'a> {
    parser: Box<SelectStatement<'a>>,
    alias: Option<&'a str>
}

impl <'a> SubSelect<'a> {
    fn new(parser: SelectStatement<'a>, alias: &'a str) -> Self {
        SubSelect {
            parser: Box::new(parser),
            alias: Some(alias)
        }
    }
}

#[derive(Debug, PartialEq, Eq)]
struct StringField<'a> {
    string: &'a str,
    alias: Option<&'a str>
}

impl <'a> StringField<'a> {
    fn new(string: &'a str) -> Self {
        StringField {
            string,
            alias: None
        }
    }
}

#[derive(Debug, PartialEq, Eq)]
enum TableEnum<'a> {
    Normal(Table<'a>)
//    SubSelect(SubSelect<'a>)
}

#[derive(Debug, PartialEq, Eq)]
struct Table<'a> {
    name: &'a str,
    alias: Option<&'a str>
}

impl <'a> Table<'a> {
    fn new(name: &'a str) -> Self {
        Table {
            name,
            alias: None
        }
    }

    fn new_all(name: &'a str, alias: Option<&'a str>) -> Self {
        Table {
            name,
            alias
        }
    }
}

fn sql_parser(input: &str) -> IResult<&str, SelectStatement>{
    let (input, _) = tag("select")(input)?;
    let (input, _) = space1(input)?;
    let (input, fes) = many0(field_parser)(input)?;
    let (input, _) = tag("from")(input)?;
    let (input, _) = space1(input)?;
    let (input, ts) = sql_alphanumeric(input)?;
    let (input, _) = space0(input)?;
    let (input, alias) = opt(alias_parser)(input)?;

    let t = Table::new_all(ts, alias);
    let te = TableEnum::Normal(t);

    let ss = SelectStatement {
        field: fes,
        table: vec![te]
    };

    Ok((input, ss))
}

/// 处理三种
/// 1、纯字符串 "a" as s  以及 'a' as s，可以区分" 与 '
/// 2、常规的table.column
/// 3、子查询
///
/// 这里为了方便编写，进行一个简单的约定：
/// 1、不会出现算数表达式 1+2 此类
/// 2、不会出现函数
/// 3、除了子查询，不会再出现()包裹的情况，子查询必须()包裹切必须有别名
fn field_parser(input: &str) -> IResult<&str, FieldEnum> {
    let first_char = &input.chars().next();
    if first_char.is_none() {
        return Err(Err::Error(ParseError::from_error_kind(input, ErrorKind::NoneOf)));
    }
    let first_char = first_char.as_ref().unwrap().to_owned();

    // 拿第一个字符进行比对，看到底是什么类型
    let (input, fe) = if first_char == '\'' || first_char == '"' {
        let (input, sf) = string_field_parser(first_char)(input)?;
        (input, FieldEnum::String(sf))
    } else if first_char == '(' {
        let (input, ss) = sub_parser(input)?;
        (input, FieldEnum::SubSelect(ss))
    } else {
        let (input, nf) = normal_field_parser(input)?;
        (input, FieldEnum::Normal(nf))
    };
    let (input, _) = opt(char(','))(input)?;

    let (input, _) = space0(input)?;

    Ok((input, fe))
}

/// 子语句解析
fn sub_parser(input: &str) -> IResult<&str, SubSelect> {
    let (input, _) = char('(')(input)?;
    let (input, _) = space0(input)?;
    let (input, ss) = sql_parser(input)?;
    let (input, _) = space0(input)?;
    let (input, _) = char(')')(input)?;
    let (input, _) = space0(input)?;
    let (input, alias) = alias_parser(input)?;

    let ss = SubSelect::new(ss, alias);

    Ok((input, ss))
}


/// 字段值为字符串
fn string_field_parser(c: char) -> impl Fn(&str) -> IResult<&str, StringField> {
    move |input: &str| {
        let (input, _) = char(c)(input)?;
        let s = c.to_string() +"\\";
        let (input, string) = escaped(sql_alphanumeric1, '\\',
                                      one_of(s.as_str()))(input)?;
        let (input, _) = char(c)(input)?;
        let (input, _) = space1(input)?;
        let (input, alias) = opt(alias_parser)(input)?;

        let mut field = StringField::new(string);
        if alias.is_some() {
            field.alias = Some(alias.unwrap());
        }

        Ok((input, field))
    }
}

/// 普通字段解析
/// a.field as f
fn normal_field_parser(input: &str) -> IResult<&str, Field> {
    let (input, t) = opt(tuple((sql_alphanumeric, char('.'))))(input)?;
    let (input, field) = sql_alphanumeric(input)?;
    let (input, _) = space0(input)?;
    let (input, alias) = opt(alias_parser)(input)?;
    let (input, _) = opt(char(','))(input)?;

    let mut field = Field::new(field);
    if t.is_some() {
        let t = t.unwrap();
        field.own = Some(t.0);
    }
    if alias.is_some() {
        field.alias = Some(alias.unwrap());
    }

    Ok((input, field))
}

/// 转换as语句
fn alias_parser(input: &str) -> IResult<&str, &str> {
    let (input, _) = if input.starts_with("as ") {
        let (input, _) = tag("as")(input)?;
        let (input, _) = space1(input)?;
        (input, ())
    } else {
        let (input, _) = space0(input)?;
        (input, ())
    };
    let (input, alias) = sql_alphanumeric1(input)?;
    let (input, _) = space0(input)?;
    Ok((input, alias))
}

/// sql的字段规则
/// 0-9a-ZA-Z_ 并且不能为关键字
fn sql_alphanumeric(input: &str) -> IResult<&str, &str> {
    let clone = input.clone();
    let splits: Vec<&str> = clone.split(' ').collect();

    if splits.len() > 0 {
        let first_word = splits[0];

        if (first_word == "group" || first_word == "order") && splits.len() > 1 {
            let second_word = splits[1];
            if second_word == "by" {
                return Err(Err::Error(ParseError::from_error_kind(input, ErrorKind::IsNot)));
            }
        } else {
            if KEY_WORDS.contains(first_word) {
                return Err(Err::Error(ParseError::from_error_kind(input, ErrorKind::IsNot)));
            }
        }
    }

    input.split_at_position_complete(|item| !is_sql_alphanumeric(item))
}

/// sql的字段规则
/// 0-9a-ZA-Z_ 并且不能为关键字
fn sql_alphanumeric1(input: &str) -> IResult<&str, &str> {
    let clone = input.clone();
    let splits: Vec<&str> = clone.split(' ').collect();

    if splits.len() > 0 {
        let first_word = splits[0];

        if (first_word == "group" || first_word == "order") && splits.len() > 1 {
            let second_word = splits[1];
            if second_word == "by" {
                return Err(Err::Error(ParseError::from_error_kind(input, ErrorKind::IsNot)));
            }
        } else {
            if KEY_WORDS.contains(first_word) {
                return Err(Err::Error(ParseError::from_error_kind(input, ErrorKind::IsNot)));
            }
        }
    }

    input.split_at_position1_complete(|item| !is_sql_alphanumeric(item), ErrorKind::IsNot)
}

/// sql的字段规则
/// 0-9a-ZA-Z_ 并且不能为关键字
fn is_sql_alphanumeric(chr: char) -> bool {
    is_alphanumeric(chr as u8) || chr == '_'
}


#[test]
fn test_alias_parser() {
    let s = "as alias";

    let expected = Ok(("", "alias"));
    let actual = alias_parser(s);

    assert_eq!(actual, expected);
}

#[test]
fn test_normal_field_parser() {
    let s = "field as f ";

    let excepted = Field {
        own: None,
        name: "field",
        alias: Some("f")
    };
    let actual = normal_field_parser(s);

    assert_eq!(actual, Ok(("", excepted)));
}

#[test]
fn test_string_field_parser() {
    let s = "\"field\\\"\" f";

    let excepted = StringField {
        string: "field\\\"",
        alias: Some("f")
    };

    let actual: IResult<&str, StringField> = string_field_parser('"')(s);

    assert_eq!(actual, Ok(("", excepted)));
}

#[test]
fn test_sql_parser1() {
    let s = "select a.column c from table a";

    let f = Field::new_all(Some("a"), "column", Some("c"));
    let fe = FieldEnum::Normal(f);
    let t = Table::new_all("table", Some("a"));
    let te = TableEnum::Normal(t);

    let excepted = SelectStatement {
        field: vec![fe],
        table: vec![te]
    };

    let actual: IResult<&str, SelectStatement> = sql_parser(s);

    assert_eq!(actual, Ok(("", excepted)))
}

#[test]
fn test_sql_parser2() {
    let s = "select a.column1, \"select\" s, (select c from table2) as sub from table1 as a";

    let f1 = Field::new_own(Some("a"), "column1");
    let fe1 = FieldEnum::Normal(f1);

    let mut f2 = StringField::new("select");
    f2.alias = Some("s");
    let fe2 = FieldEnum::String(f2);

    let sub_f1 = Field::new("c");
    let sub_fe1 = FieldEnum::Normal(sub_f1);
    let sub_t = Table::new("table2");
    let sub_te = TableEnum::Normal(sub_t);
    let sub_ss = SelectStatement {
        field: vec![sub_fe1],
        table: vec![sub_te]
    };
    let f3 = SubSelect::new(sub_ss, "sub");
    let fe3 = FieldEnum::SubSelect(f3);

    let t = Table::new_all("table1", Some("a"));
    let te = TableEnum::Normal(t);

    let excepted = SelectStatement {
        field: vec![fe1, fe2, fe3],
        table: vec![te]
    };

    let actual: IResult<&str, SelectStatement> = sql_parser(s);

    assert_eq!(actual, Ok(("", excepted)));
}